<?php
namespace app\Console;

use think\console\Command;
use think\console\Input;
use think\console\Output;
use think\Db;


//https://www.kancloud.cn/chunice/think-swoole/266286
//异步多任务处理
class Update extends Command
{
    protected $server;
    public $fields = [  //这些字段每个页面有很多个,需要循环采集  这个属性中包含两个字段   title和url
                        'title' => [
                            'select' => '//*[@id="list"]/dl/dd/a',
                            'node'   => 'nodeValue',
                        ],
    ];

    // 命令行配置函数
    protected function configure()
    {
        // setName 设置命令行名称
        // setDescription 设置命令行描述
        $this->setName('upbook:start')->setDescription('Start thinkphp Task Server!');
    }

    // 设置命令返回信息
    protected function execute(Input $input, Output $output)
    {
        $this->server = new \swoole_server('0.0.0.0', 9502);

        // server 运行前配置
        $this->server->set([
            'worker_num'      => 1,
            'daemonize'       => false,  //守护进程
            'task_worker_num' => 2  # task 进程数
        ]);

        // 注册回调函数
        $this->server->on('Start', [$this, 'onStart']);
        $this->server->on('Connect', [$this, 'onConnect']);
        $this->server->on('Receive', [$this, 'onReceive']);
        $this->server->on('Task', [$this, 'onTask']);
        $this->server->on('Finish', [$this, 'onFinish']);
        $this->server->on('Close', [$this, 'onClose']);

        $this->server->start();
    }

    // 主进程启动时回调函数
    public function onStart(\swoole_server $server)
    {
        echo "Start\n";
    }

    // 建立连接时回调函数
    public function onConnect(\swoole_server $server, $fd, $from_id)
    {
        //echo "Connect\n";
    }

    // 收到信息时回调函数
    public function onReceive(\swoole_server $server, $fd, $from_id, $data)
    {
        $data = ltrim($data, 'GTE /?data=');
        $data = rtrim($data, "HTTP/1.1\r\nHost: 127.0.0.1:9502\r\nAccept: */*\r\n");
        $data = json_decode($data, true);
        // 投递异步任务
        $d = [];
        $d['fd'] = $fd;
        foreach($data as $k=>$v){
            $d['id'] = $v;
            $task_id = $server->task($d);  //投送成功返回任务id
        }

        //echo "Dispath AsyncTask: id={$task_id}\n";

        // 将受到的客户端消息再返回给客户端
        $server->send($fd, '收到消息');
        $server->close($fd, false);  //似乎连接会一直持续存在，不超时就不会断开   不知道swoole_http_server是不是也是这样

    }

    // 异步任务处理函数
    public function onTask(\swoole_server $server, $task_id, $from_id, $data)
    {
        $val = Db::name('bookname')->where('id', $data['id'])->field('id,bookname,url')->find();
        if(empty($val)){
            $server->finish("{$data['id']} -> 数据库查询失败");
        }else{
            $html = curl_get($val['url']);
            //curl到的数据跟数据库中的数组做差值，剩下的就是新增的
            $res = getConFromHtml3($html, $this->fields);

            if (empty($res)) {
                $server->finish("{$data['id']} -> 第一次采集失败");
            }else{
                $num1 = count($res);
                $table = switchTableById2($val['id']);

                $num2 = Db::name($table)->field('id')->where('bid', $val['id'])->count();
                $num = $num2 - $num1;
                $bookname = $val['bookname'];
                if ($num < 0) {

                    $arr = array_slice($res, $num); //更新的数据 title和url
                    $sql = 'insert into book_' . $table . '(bid,bookname,title,url) values';
                    foreach ($arr as $v) {
                        $sql .= '(' . $val['id'] . ',"' . $bookname . '","' . $v['title'] . '","' . $v['url'] . '"),';
                    }
                    $sql = rtrim($sql, ',');
                    $all = Db::execute($sql);
                    if($all){
                        $server->finish("{$data['id']} -> OK");
                    }else{
                        $server->finish("{$data['id']} -> 新增失败");
                    }
                }else{
                    $server->finish("{$data['id']} -> 不需要新增");
                }
            }
        }
    }

    // 异步任务完成通知 Worker 进程函数
    public function onFinish(\swoole_server $server, $task_id, $data)
    {
        $n = $server->stats();
        echo "AsyncTask[{$task_id}] Finish: {$data} 还剩{$n['tasking_num']}个任务正在排队\n";
    }

    // 关闭连时回调函数
    public function onClose(\swoole_server $server, $fd, $from_id)
    {
        //echo "Close\n";

        //dump($n);
        //Db::clear();
        //$server->send($fd, 1);
    }
}